我们在工作中,为了保证环境的高可用,防止单点,Kafka 都是以集群的方式出现的,下面就带领大家一起搭建一套 Kafka 集群环境。
我们在官网下载 Kafka,下载地址为:http://kafka.apache.org/downloads,下载我们需要的版本,推荐使用稳定的版本。
搭建集群
①下载并解压
1 | cd /usr/local/src |
②修改配置文件
Kafka 的配置文件 $KAFKA_HOME/config/server.properties,主要修改一下下面几项:
1 | 确保每个机器上的id不一样 |
因为我自己是本机做实验,所有使用的是一个主机的不同端口,在线上,就是不同的机器,大家参考即可。
我们这里使用 Kafka 的 Zookeeper,只启动一个节点,但是正真的生产过程中,是需要 Zookeeper 集群,自己搭建就好,后期我们也会出 Zookeeper 的教程,大家请关注就好了。
③拷贝 3 份配置文件
1 | #创建对应的日志目录 |
④修改不同端口对应的文件
1 | #9092的id为0, 9093的id为1, 9094的id为2 |
修改 Zookeeper 的配置文件:
1 | dataDir=/data/servers/zookeeper |
然后创建 Zookeeper 的 myid 文件:
1 | echo "1"> /data/servers/zookeeper/myid |
⑤启动 Zookeeper
使用 Kafka 内置的 Zookeeper:
1 | cd /data/servers/kafka_2.11-2.4.0/bin |
启动 Kafka:
1 | ./kafka-server-start.sh -daemon ../config/server_9092.properties |
Kafka 的操作
①Topic
创建 Topic 常用的参数:
- create:创建 topic
- delete:删除 topic
- alter:修改 topic 的名字或者 partition 个数
- list:查看 topic
- describe:查看 topic 的详细信息
- topic <String: topic>:指定 topic 的名字
- zookeeper <String: hosts>:指定 Zookeeper 的连接地址参数提示并不赞成这样使用(DEPRECATED, The connection string for the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over.)
- bootstrap-server <String: server to connect to>:指定 Kafka 的连接地址,推荐使用这个,参数的提示信息显示(REQUIRED: The Kafka server to connect to. In case of providing this, a direct Zookeeper connection won’t be required.)。
- replication-factor <Integer: replication factor> : 对于每个 Partiton 的备份个数。(The replication factor for each partition in the topic being created. If not supplied, defaults to the cluster default.)
- partitions <Integer: # of partitions>:指定该 topic 的分区的个数。
示例:
1 | cd /data/servers/kafka_2.11-2.4.0/bin |
②自动创建 Topic
我们在工作中,如果我们不想去管理 Topic,可以通过 Kafka 的配置文件来管理。
我们可以让 Kafka 自动创建 Topic,需要在我们的 Kafka 配置文件中加入如下配置文件:
1 | auto.create.topics.enable=true |
如果删除 Topic 想达到物理删除的目的,也是需要配置的:
1 | delete.topic.enable=true |
③发送消息
他们可以通过客户端的命令生产消息,先来看看 kafka-console-producer.sh 常用的几个参数吧:
- topic <String: topic>:指定 topic
- timeout <Integer: timeout_ms>:超时时间
- sync:异步发送消息
- broker-list <String: broker-list>:官网提示: REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.
这个参数是必须的:
1 | kafka-console-producer.sh --broker-list 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1 |
④消费消息
我们也还是先来看看 kafka-console-consumer.sh 的参数吧:
- topic <String: topic>:指定 topic
- group <String: consumer group id>:指定消费者组
- from-beginning:指定从开始进行消费, 如果不指定, 就从当前进行消费
- bootstrap-server:Kafka 的连接地址
1 | kafka-console-consumer.sh --bootstrap-server 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1 ---beginning |
Kafka 的日志
Kafka 的日志分两种:
- 第一种日志是我们的 Kafka 的启动日志,就是我们排查问题,查看报错信息的日志。
- 第二种日志就是我们的数据日志,Kafka 是我们的数据是以日志的形式存在存盘中的,我们第二种所说的日志就是我们的 Partiton 与 Segment。
那我们就来说说备份和分区吧:我们创建一个分区,一个备份,那么 test 就应该在三台机器上或者三个数据目录只有一个 test-0。(分区的下标是从 0 开始的)
如果我们创建 N 个分区,我们就会在三个服务器上发现,test_0-n,如果我们创建 M 个备份,我们就会在发现,test_0 到 test_n 每一个都是 M 个。